参考文章: Flask源码分析(一) , Flask源码分析(二) , Flask源码分析(三)
WSGI 在说到wsgi时我们先看一下面向http的python程序需要关心的内容:
请求:
请求方法(method)
请求地址(url)
请求内容(body)
请求头(header)
请求环境(environ)
响应:
响应码(status_code)
响应数据(data)
响应头(header)
wsgi要做的就是关于程序端和服务端的标准规范, 将服务程序接收到的请i去传递给python程序, 并将网络的数据流和python的结构体进行转换. 它规定了python程序必须是一个可调用对象(实现了call 函数的方法或类), 接受两个参数environ(WSGI的环境信息)和start_response(开始响应请求的函数), 并返回可迭代的结果. 直接上代码来实现一个最简单的web程序返回hello world:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from werkzeug.serving import run_simpleclass WebClass : def __init__ (self) : pass def __call__ (self, environ, start_response) : status = '200 OK' response_headers = [('Content-type' , 'text/plain' )] start_response(status, response_headers) yield str.encode("Hello World!\n" ) if __name__ == "__main__" : app = WebClass() run_simple("127.0.0.1" , 5000 , app)
WebClass正是实现了__call__
方法的可调用对象, 接受environ和start_respone, 并在返回之前调用start_response, start_response接受两个必须的参数, status_code(http状态码)和response_header(响应头), yield hello world正是要求的可迭代结果, 现在这个类只是实现了最简单的功能, 路由注册, 模板渲染等都没有实现.这里用了werkzeug提供的run_simple, 其实我们创建flask应用, 跑起来的时候调用的也是这个函数,后面将会讲到。
项目运行 app.run()
, 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 def run (self, host=None, port=None, debug=None, load_dotenv=True, **options) : if os.environ.get("FLASK_RUN_FROM_CLI" ) == "true" : from .debughelpers import explain_ignored_app_run explain_ignored_app_run() return if get_load_dotenv(load_dotenv): cli.load_dotenv() if "FLASK_ENV" in os.environ: self.env = get_env() self.debug = get_debug_flag() elif "FLASK_DEBUG" in os.environ: self.debug = get_debug_flag() if debug is not None : self.debug = bool(debug) _host = "127.0.0.1" _port = 5000 server_name = self.config.get("SERVER_NAME" ) sn_host, sn_port = None , None if server_name: sn_host, _, sn_port = server_name.partition(":" ) host = host or sn_host or _host port = int(next((p for p in (port, sn_port) if p is not None ), _port)) options.setdefault("use_reloader" , self.debug) options.setdefault("use_debugger" , self.debug) options.setdefault("threaded" , True ) cli.show_server_banner(self.env, self.debug, self.name, False ) from werkzeug.serving import run_simple try : run_simple(host, port, self, **options) finally : self._got_first_request = False
上面函数的功能很简单,处理了以下参数,其中最主要的还是调用 werkzeug 的 run_simple 函数, run_simple 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 def run_simple ( hostname, port, application, use_reloader=False, use_debugger=False, use_evalex=True, extra_files=None, reloader_interval=1 , reloader_type="auto" , threaded=False, processes=1 , request_handler=None, static_files=None, passthrough_errors=False, ssl_context=None, ) : if not isinstance(port, int): raise TypeError("port must be an integer" ) if use_debugger: from .debug import DebuggedApplication application = DebuggedApplication(application, use_evalex) if static_files: from .middleware.shared_data import SharedDataMiddleware application = SharedDataMiddleware(application, static_files) def log_startup (sock) : display_hostname = hostname if hostname not in ("" , "*" ) else "localhost" quit_msg = "(Press CTRL+C to quit)" if sock.family == af_unix: _log("info" , " * Running on %s %s" , display_hostname, quit_msg) else : if ":" in display_hostname: display_hostname = "[%s]" % display_hostname port = sock.getsockname()[1 ] _log( "info" , " * Running on %s://%s:%d/ %s" , "http" if ssl_context is None else "https" , display_hostname, port, quit_msg, ) def inner () : try : fd = int(os.environ["WERKZEUG_SERVER_FD" ]) except (LookupError, ValueError): fd = None srv = make_server( hostname, port, application, threaded, processes, request_handler, passthrough_errors, ssl_context, fd=fd, ) if fd is None : log_startup(srv.socket) srv.serve_forever() if use_reloader: if not is_running_from_reloader(): if port == 0 and not can_open_by_fd: raise ValueError( "Cannot bind to a random port with enabled " "reloader if the Python interpreter does " "not support socket opening by fd." ) address_family = select_address_family(hostname, port) server_address = get_sockaddr(hostname, port, address_family) s = socket.socket(address_family, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) s.bind(server_address) if hasattr(s, "set_inheritable" ): s.set_inheritable(True ) if can_open_by_fd: os.environ["WERKZEUG_SERVER_FD" ] = str(s.fileno()) s.listen(LISTEN_QUEUE) log_startup(s) else : s.close() if address_family == af_unix: _log("info" , "Unlinking %s" % server_address) os.unlink(server_address) from ._reloader import run_with_reloader run_with_reloader(inner, extra_files, reloader_interval, reloader_type) else : inner()
run_simple 中最后调用了其中的 inner 函数,inner 中的 make_server 函数返回一个 WSGIServer 对象,然后再调用 WSGIServer 对象的 serve_forever 方法,创建 wsgi 的服务, 然后运行,这样项目就运行起来了。
请求处理 前面说了wsgi规定应用程序必须实现__call__
方法, 找到Flask对应的内容:
1 2 def __call__ (self, environ, start_response) : return self.wsgi_app(environ, start_response)
wsgi_app 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def wsgi_app (self, environ, start_response) : ctx = self.request_context(environ) error = None try : try : ctx.push() response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except : error = sys.exc_info()[1 ] raise return response(environ, start_response) finally : if self.should_ignore_error(error): error = None ctx.auto_pop(error)
full_dispatch_request
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 def full_dispatch_request (self) : self.try_trigger_before_first_request_functions() try : request_started.send(self) rv = self.preprocess_request() if rv is None : rv = self.dispatch_request() except Exception as e: rv = self.handle_user_exception(e) return self.finalize_request(rv)
这段最核心的就是 dispatch_request
, dispatch_request
就是我们注册的路由函数的执行结果, 在 dispatch_request
之前我们看到 preprocess_request
, 它的作用是将钩子函数处理一下
第一次请求处理之前的钩子函数, 通过 before_first_request
定义的
每个请求处理之前的钩子函数, 通过 before_request
定义的
dispatch_request 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def dispatch_request (self) : req = _request_ctx_stack.top.request if req.routing_exception is not None : self.raise_routing_exception(req) rule = req.url_rule if (getattr(rule, "provide_automatic_options" , False ) and req.method == "OPTIONS" ): return self.make_default_options_response() return self.view_functions[rule.endpoint](**req.view_args)
而在 dispat_request
之后还有 finalize_request
函数, 它的作用同样是将请求结果通过钩子函数处理一下:
每个请求正常处理之后的 hook 函数,通过 after_request
定义
不管请求是否异常都要执行的 teardown_request
hook 函数 所以上面最重要的就是 dispatch_request
函数, 找到我们注册的路由函数, 并返回
preprocess_request
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def preprocess_request (self) : bp = _request_ctx_stack.top.request.blueprint funcs = self.url_value_preprocessors.get(None , ()) if bp is not None and bp in self.url_value_preprocessors: funcs = chain(funcs, self.url_value_preprocessors[bp]) for func in funcs: func(request.endpoint, request.view_args) funcs = self.before_request_funcs.get(None , ()) if bp is not None and bp in self.before_request_funcs: funcs = chain(funcs, self.before_request_funcs[bp]) for func in funcs: rv = func() if rv is not None : return rv
url_value_preprocessors 是一个字典,用于存储预处理请求 URL 中的值的函数。这些函数可以是全局的,也可以是与特定蓝图(Blueprint)关联的。当一个请求进入应用时,Flask 需要根据请求的蓝图确定需要执行哪些预处理函数。
考虑以下情况:您可能在应用级别注册了一些全局的 url_value_preprocessors,同时在特定的蓝图上也注册了一些预处理函数。在处理请求时,您可能希望先执行全局的预处理函数,然后再执行与当前蓝图关联的预处理函数。
这就是 funcs = chain(funcs, self.url_value_preprocessors[bp]) 这句代码的作用。它使用 itertools.chain 函数,将全局的预处理函数和与当前蓝图关联的预处理函数合并成一个生成器。生成器会依次返回这些函数,让 Flask 在适当的时候调用它们来预处理请求 URL 中的值。
要注意的是,itertools.chain 并不会立即执行这些函数,而是在迭代过程中依次执行它们。这样可以确保在请求处理之前按照合适的顺序执行所有的预处理函数。
finalize_request
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def finalize_request (self, rv, from_error_handler=False) : response = self.make_response(rv) try : response = self.process_response(response) request_finished.send(self, response=response) except Exception: if not from_error_handler: raise self.logger.exception( "Request finalizing failed with an error while handling an error" ) return response
process_response
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 def process_response (self, response) : ctx = _request_ctx_stack.top bp = ctx.request.blueprint funcs = ctx._after_request_functions if bp is not None and bp in self.after_request_funcs: funcs = chain(funcs, reversed(self.after_request_funcs[bp])) if None in self.after_request_funcs: funcs = chain(funcs, reversed(self.after_request_funcs[None ])) for handler in funcs: response = handler(response) if not self.session_interface.is_null_session(ctx.session): self.session_interface.save_session(self, ctx.session, response) return response
响应 make_response
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 def make_response (self, rv) : status = headers = None if isinstance(rv, tuple): len_rv = len(rv) if len_rv == 3 : rv, status, headers = rv elif len_rv == 2 : if isinstance(rv[1 ], (Headers, dict, tuple, list)): rv, headers = rv else : rv, status = rv else : raise TypeError( "The view function did not return a valid response tuple." " The tuple must have the form (body, status, headers)," " (body, status), or (body, headers)." ) if rv is None : raise TypeError( "The view function did not return a valid response. The" " function either returned None or ended without a return" " statement." ) if not isinstance(rv, self.response_class): if isinstance(rv, (text_type, bytes, bytearray)): rv = self.response_class(rv, status=status, headers=headers) status = headers = None elif isinstance(rv, dict): rv = jsonify(rv) elif isinstance(rv, BaseResponse) or callable(rv): try : rv = self.response_class.force_type(rv, request.environ) except TypeError as e: new_error = TypeError( "{e}\nThe view function did not return a valid" " response. The return type must be a string, dict, tuple," " Response instance, or WSGI callable, but it was a" " {rv.__class__.__name__}." .format(e=e, rv=rv) ) reraise(TypeError, new_error, sys.exc_info()[2 ]) else : raise TypeError( "The view function did not return a valid" " response. The return type must be a string, dict, tuple," " Response instance, or WSGI callable, but it was a" " {rv.__class__.__name__}." .format(rv=rv) ) if status is not None : if isinstance(status, (text_type, bytes, bytearray)): rv.status = status else : rv.status_code = status if headers: rv.headers.extend(headers) return rv
路由匹配 在 flask 中, 构建路由规则有两种方法, 这两种方法其实是一样的,都是调用 add_url_rule
来实现
通过`@app.route ()`的装饰器, 上面例子用的就是这种方法
通过app.add_url_rule
, 这个方法的签名为 add_url_rule(self, rule, endpoint=None, view_func=None, **options)
1 2 3 4 5 6 def route (self, rule, **options) : def decorator (f) : endpoint = options.pop("endpoint" , None ) self.add_url_rule(rule, endpoint, f, **options) return f return decorator
add_url_rule
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @setupmethod def add_url_rule ( self, rule, endpoint=None, view_func=None, provide_automatic_options=None, **options ) : if endpoint is None : endpoint = _endpoint_from_view_func(view_func) options["endpoint" ] = endpoint methods = options.pop("methods" , None ) if methods is None : methods = getattr(view_func, "methods" , None ) or ("GET" ,) if isinstance(methods, string_types): raise TypeError( "Allowed methods have to be iterables of strings, " 'for example: @app.route(..., methods=["POST"])' ) methods = set(item.upper() for item in methods) required_methods = set(getattr(view_func, "required_methods" , ())) if provide_automatic_options is None : provide_automatic_options = getattr( view_func, "provide_automatic_options" , None ) if provide_automatic_options is None : if "OPTIONS" not in methods: provide_automatic_options = True required_methods.add("OPTIONS" ) else : provide_automatic_options = False methods |= required_methods rule = self.url_rule_class(rule, methods=methods, **options) rule.provide_automatic_options = provide_automatic_options self.url_map.add(rule) if view_func is not None : old_func = self.view_functions.get(endpoint) if old_func is not None and old_func != view_func: raise AssertionError( "View function mapping is overwriting an " "existing endpoint function: %s" % endpoint ) self.view_functions[endpoint] = view_func
可以发现这个函数主要做的就是更新 app 的 url_map 和 view_functions 这两个变量.查找定义, 发现 url_map 是 werkzeug.routing 的Map 类对象, rule 是 werkzeug.routing 的 Rule 类对象, view_functions 就是一个字典, 从上我们也可以知道每个视图函数的 endpoint 必须是不同的.也可以发现, flask 的核心路由逻辑其实实在 werkzeug 中实现的。
dispatch_request
源码如下,在这里面将路由和要处理的函数结合起来,实现执行视图函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def dispatch_request (self) : req = _request_ctx_stack.top.request if req.routing_exception is not None : self.raise_routing_exception(req) rule = req.url_rule if ( getattr(rule, "provide_automatic_options" , False ) and req.method == "OPTIONS" ): return self.make_default_options_response() return self.view_functions[rule.endpoint](**req.view_args)
上下文 之前在上面我们已经讲到dispatch_request函数在找到view_function后, 只是将最基本的参数传给了view_function, 可是有时这对视图函数来说是远远不够的, 它有时还需要头部(header), body里的数据, 才能正确运行, 可能 最简单的方法就是将所有的这些信息封装成一个对象, 作为参数传给视图函数, 可是这样一来所有的视图函数都需要添加对应的参数, 即使并没有用到它.
flask 的做法是把这些信息作为上下文, 类似全局变量的东西, 在需要的时候, 用 from flask import request 获取, 比如经常用的request.json, request.args, 这里有一个很重要的点就是它们必须是动态的, 在多线程或多协程的情况下, 每个线程或协程获取的必须是自己独特的对象, 不能导入后结果获取的是其他请求的内容, 那就乱套了.
那么flask是如何实现不同的线程协程准确获得自己的上下文的呢, 我们先来看一下这两个上下文的定义:
application context
演化成出两个变量 current_app
和 g
request context
演化出 request
和 session
他们的实现正式依靠 Local Stack
和 Local Proxy
类, 正是这两个东西才让我们在并发程序中每个视图函数都会看到属于自己的上下文而不会混乱, 而这两个类能在多线程或多协程情况下实现隔离效果是考了另一个基础类 Local
, 实现了类似threading.local的效果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def _lookup_req_object (name) : top = _request_ctx_stack.top if top is None : raise RuntimeError(_request_ctx_err_msg) return getattr(top, name) def _lookup_app_object (name) : top = _app_ctx_stack.top if top is None : raise RuntimeError(_app_ctx_err_msg) return getattr(top, name) def _find_app () : top = _app_ctx_stack.top if top is None : raise RuntimeError(_app_ctx_err_msg) return top.app _request_ctx_stack = LocalStack() _app_ctx_stack = LocalStack() current_app = LocalProxy(_find_app) request = LocalProxy(partial(_lookup_req_object, "request" )) session = LocalProxy(partial(_lookup_req_object, "session" )) g = LocalProxy(partial(_lookup_app_object, "g" ))
为什么不直接使用 LocalStack,而是使用 LocalProxy 呢?
主要原因在于 LocalProxy
的设计目标是为了提供更加简洁和方便的访问方式,使开发人员能够以属性访问的方式轻松地获取当前线程的上下文数据。LocalProxy
可以用来简化代码,并让您的代码更具可读性和可维护性。通过使用 LocalProxy
,您可以避免在多个地方重复引用 LocalStack
实例。
Local 代码如下:
__storage__
是用于存储内容的地方,格式为 {“线程ID”: {“key”:”value”}},因为每个请求线程 ID 不一样,所以 Local 实现了隔离的效果。__ident_func__
绑定的是 get_ident
函数,作用是获取当前的线程 ID。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class Local (object) : __slots__ = ("__storage__" , "__ident_func__" ) def __init__ (self) : object.__setattr__(self, "__storage__" , {}) object.__setattr__(self, "__ident_func__" , get_ident) def __iter__ (self) : return iter(self.__storage__.items()) def __call__ (self, proxy) : """Create a proxy for a name.""" return LocalProxy(self, proxy) def __release_local__ (self) : self.__storage__.pop(self.__ident_func__(), None ) def __getattr__ (self, name) : try : return self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name) def __setattr__ (self, name, value) : ident = self.__ident_func__() storage = self.__storage__ try : storage[ident][name] = value except KeyError: storage[ident] = {name: value} def __delattr__ (self, name) : try : del self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name)
Local 是用来提供多线程或多协程的隔离属性访问的, 那么 Local Stack 就提供了隔离的栈访问, 它只要提供了 push, pop, top方法, 主要是栈的一些方法,在 LocalStack 的 push 方法中, 其实是对属性_local
也就是 Local 的操作, 也就是先创建一个列表, self._local.storage[ident(当前线程或协程id)]['stack'] = []
, 然后其实还是用 append 将 request 请求信息添加进去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class LocalStack (object) : def __init__ (self) : self._local = Local() def __release_local__ (self) : self._local.__release_local__() def _get__ident_func__ (self) : return self._local.__ident_func__ def _set__ident_func__ (self, value) : object.__setattr__(self._local, '__ident_func__' , value) __ident_func__ = property(_get__ident_func__, _set__ident_func__) del _get__ident_func__, _set__ident_func__ def __call__ (self) : def _lookup () : rv = self.top if rv is None : raise RuntimeError('object unbound' ) return rv return LocalProxy(_lookup) def push (self, obj) : rv = getattr(self._local, 'stack' , None ) if rv is None : self._local.stack = rv = [] rv.append(obj) return rv def pop (self) : stack = getattr(self._local, 'stack' , None ) if stack is None : return None elif len(stack) == 1 : release_local(self._local) return stack[-1 ] else : return stack.pop() @property def top (self) : try : return self._local.stack[-1 ] except (AttributeError, IndexError): return None
上述已经将 Local 和 LocalStack 讲的差不多了。request ,g 的实现,用的是 LocalProxy,LocalProxy 还重写了所有的魔术方法,具体实现都是代理对象,LocalProxy 简要代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @implements_bool class LocalProxy (object) : __slots__ = ('__local' , '__dict__' , '__name__' , '__wrapped__' ) def __init__ (self, local, name=None) : object.__setattr__(self, '_LocalProxy__local' , local) object.__setattr__(self, '__name__' , name) if callable(local) and not hasattr(local, '__release_local__' ): object.__setattr__(self, '__wrapped__' , local) def _get_current_object (self) : if not hasattr(self.__local, '__release_local__' ): return self.__local() try : return getattr(self.__local, self.__name__) except AttributeError: raise RuntimeError('no object bound to %s' % self.__name__)
_request_ctx_stack
代表的就是请求上下文,ctx 其实是 RequestContext,请求来的时候会调用 RequestContext 的 push 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 def push (self) : app_ctx = _app_ctx_stack.top if app_ctx is None or app_ctx.app != self.app: app_ctx = self.app.app_context() app_ctx.push() self._implicit_app_ctx_stack.append(app_ctx) else : self._implicit_app_ctx_stack.append(None ) if hasattr(sys, "exc_clear" ): sys.exc_clear() _request_ctx_stack.push(self) if self.session is None : session_interface = self.app.session_interface self.session = session_interface.open_session(self.app, self.request) if self.session is None : self.session = session_interface.make_null_session(self.app) if self.url_adapter is not None : self.match_request() def pop (self, exc=_sentinel) : app_ctx = self._implicit_app_ctx_stack.pop() try : clear_request = False if not self._implicit_app_ctx_stack: self.app.do_teardown_request(exc) request_close = getattr(self.request, 'close' , None ) if request_close is not None : request_close() clear_request = True finally : rv = _request_ctx_stack.pop() if clear_request: rv.request.environ['werkzeug.request' ] = None if app_ctx is not None : app_ctx.pop(exc) def auto_pop (self, exc) : if self.request.environ.get('flask._preserve_context' ) or \ (exc is not None and self.app.preserve_context_on_exception): self.preserved = True self._preserved_exc = exc else : self.pop(exc)
push 就是将该请求的 application context (如果 _app_ctx_stack 栈顶不是当前请求所在 app,需要重新创建 app context )和 request context 都保存到相关的栈上, pop 则相反, 做一些出栈清理操作。
现在上下文就比较清楚了,就是每次有请求过来,flask 会创建当前线程或协程需要处理的两个上下文,并压入隔离的栈。application context针对的是 flask实 例的, 因为 app 实例只有一个, 所以多个请求其实是公用一个 application context, 而 request context 是每次请求过来都要创建的,在请求结束时又出栈, 所以两个的生命周期时不同的, 也就是 application context 的周期就是实例的生命周期, 而request context 的生命周期取决于请求存在的时间。
flask: app.run() 时, 并没有看到哪里启动了多线程, 理论上在单线程的情况下, 只有一个请求处理完成之后才能处理下一个请求, 那么上面为什么能同时处理多个请求, 哪里创建了多线程呢? 在flask的启动函数(run)中, 有这么一个源码:
1 options.setdefault("threaded" , True )
有了 LocalStack 为什么还需要 LocalProxy?LocalProxy 起到了什么作用?
LocalStack
是 Flask 中用于在每个线程中管理上下文数据的工具。它在多线程环境下为每个线程维护一个独立的数据栈,确保数据隔离和线程安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 my_stack = LocalStack() def worker (data) : my_stack.push(data) print("Data pushed:" , my_stack.top) my_stack.pop() import threadingthreads = [] for i in range(3 ): t = threading.Thread(target=worker, args=(f"Thread-{i} " ,)) threads.append(t) t.start() for t in threads: t.join()
LocalProxy
是 Flask 中的一个代理对象,用于访问 LocalStack
中的数据。它提供了一种简洁的方式来访问当前线程的上下文数据,而不需要显式地引用 LocalStack
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 my_stack = LocalStack() current_data = LocalProxy(my_stack) def worker (data) : my_stack.push(data) print("Data pushed:" , current_data) my_stack.pop() import threadingthreads = [] for i in range(3 ): t = threading.Thread(target=worker, args=(f"Thread-{i} " ,)) threads.append(t) t.start() for t in threads: t.join()
在上述示例中,LocalStack
用于在每个线程中管理数据栈,而 LocalProxy
则是在访问数据时提供了简洁的方式。通过将 LocalStack
包装为 LocalProxy
,您可以通过属性访问的方式来获取当前线程的上下文数据,而无需明确引用 LocalStack
。
总结:LocalStack
用于管理数据栈的隔离和线程安全,而 LocalProxy
则是提供了方便的访问方式,使您能够轻松地从当前线程的上下文中获取数据。
异常处理 在 Flask 中,异常处理主要涉及以下几个核心模块和类:
werkzeug.exceptions
模块 :Werkzeug 是 Flask 的底层库,werkzeug.exceptions
模块定义了许多常见的 HTTP 异常类,如 HTTPException
、BadRequest
、NotFound
等。这些异常类用于表示 HTTP 请求和响应中的不同状态码和错误情况。
flask
模块中的异常类 :在 Flask 的 flask
模块中,有一些自定义的异常类,如 FlaskException
和 RequestRedirect
等。这些异常类用于处理与 Flask 特定功能和行为相关的错误。
应用和蓝图中的错误处理装饰器 :在 Flask 应用中,您可以使用装饰器 `@app.errorhandler (Exception)` 来为特定异常注册处理函数,从而实现自定义的异常处理逻辑。同样,在蓝图中也有类似的机制。
**@app.errorhandler 源码分析**
`@app.errorhandler ` 装饰器是 Flask 提供的一种机制,用于为特定的异常类型注册自定义的错误处理函数。当异常被抛出并且与装饰器中指定的异常类型匹配时,注册的错误处理函数会被调用。让我们来分析一下这个装饰器的源码以及它是如何工作的。
在 Flask 中,`@app.errorhandler 装饰器的定义位于
flask.app` 模块中。以下是大致的源码结构和解释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class Flask : def errorhandler (self, code_or_exception) : def decorator (f) : self._register_error_handler(code_or_exception, f) return f return decorator def _register_error_handler (self, code_or_exception, f) : self._error_handlers[code_or_exception] = f def handle_exception (self, e) : for code_or_exception, handler in self._error_handlers.items(): if isinstance(e, code_or_exception): return handler(e) return None
`@app.errorhandler 是
Flask类的一个方法,它接受一个异常类型或者状态码作为参数。它返回一个装饰器函数
decorator`。
decorator
函数接受一个处理函数 f
,并在内部调用 _register_error_handler
方法将处理函数与异常类型或状态码注册到 _error_handlers
字典中。
_register_error_handler
方法将异常类型或状态码作为键,处理函数作为值,注册到 _error_handlers
字典中。
当应用中抛出异常时,handle_exception
方法会被调用。它会遍历 _error_handlers
字典,查找与异常类型匹配的处理函数,并调用该处理函数来处理异常。
是怎么触发错误处理的?
在 wsgi_app 中,如果 full_dispatch_request
调用失败,则会调用 handle_exception 进行错误处理,在 handle_exception 中,会根据当前的 error 去寻找对应的注册的回调方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def handle_exception (self, e) : exc_type, exc_value, tb = sys.exc_info() got_request_exception.send(self, exception=e) if self.propagate_exceptions: if exc_value is e: reraise(exc_type, exc_value, tb) else : raise e self.log_exception((exc_type, exc_value, tb)) server_error = InternalServerError() server_error.original_exception = e handler = self._find_error_handler(server_error) if handler is not None : server_error = handler(server_error) return self.finalize_request(server_error, from_error_handler=True )
信号 Flask 信号机制的核心是 blinker
库,它提供了一种用于实现事件触发和响应的简单方法。blinker
是一个独立的 Python 库,被 Flask 内部用作信号机制的基础。
以下是 Flask 中信号的基本用法:
导入 blinker
模块 :在 Flask 中,您可以通过导入 flask.signals
模块来获得 blinker
的实例。
创建信号对象 :使用 flask.signals.Namespace
类创建信号对象。例如:
1 2 3 4 from flask.signals import Namespacesignals = Namespace() my_signal = signals.signal("my-signal" )
连接信号和回调函数 :使用 .connect
方法将信号与回调函数关联起来。
1 2 3 @my_signal.connect def my_signal_handler (sender, **kwargs) : print("Signal received!" )
触发信号 :在适当的时候,通过调用信号对象的 .send
方法来触发信号。您可以在应用中的某个地方触发信号,然后相关的回调函数会被执行。
在 Flask 中,有一些内置的信号可以用于不同的事件,例如 request_started
、request_finished
、template_rendered
等。您还可以自定义信号来实现特定的事件响应。
信号机制在 Flask 中的应用范围广泛,它可以用于以下场景:
在请求处理开始和结束时执行一些操作。
在模板渲染完成后执行特定的操作。
在应用启动、关闭或重载时触发自定义操作。
在扩展或插件中进行事件监听和处理。